package com.atguigu.flink.chapter6;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2020/12/21 14:35
 */
public class Flink01_Project_PV {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env
          .readTextFile("input/UserBehavior.csv")
          .map(line -> {
            String[] split = line.split(",");
            return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));
        })
          .filter(behavior -> "pv".equals(behavior.getBehavior()))
          .map(behavior -> Tuple2.of(behavior.getBehavior(), 1L))
          .returns(Types.TUPLE(Types.STRING, Types.LONG))
          .keyBy(t -> t.f0)
          .sum(1)
          .print();

        env.execute();
    }
}
/*
公司的网站的总的pv
一. wordCount思路
    ("pv", 1)

二. reduce

三. process


 */